 1.车辆监控之Structured Streaming整合Kafka
   
   1).官网介绍
   http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
   Creating a Kafka Source for Streaming Queries
// Subscribe to 1 topic
val df = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("subscribe", "topic1")
 .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 .as[(String, String)]
// Subscribe to multiple topics
val df = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("subscribe", "topic1,topic2")
 .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 .as[(String, String)]
// Subscribe to a pattern
val df = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("subscribePattern", "topic.*")
 .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 .as[(String, String)]
 
   Creating a Kafka Source for Batch Queries
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
 .read
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("subscribe", "topic1")
 .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 .as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
 .read
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("subscribe", "topic1,topic2")
 .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
 .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
 .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 .as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
 .read
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("subscribePattern", "topic.*")
 .option("startingOffsets", "earliest")
 .option("endingOffsets", "latest")
 .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 .as[(String, String)]
 
   注意：读取后的数据的Schema是固定的，包含的列如下：
   Column          Type     说明
   key             binary   消息的key
   value           binary   消息的value
   topic           string   主题
   partition       int      分区
   offset          long     偏移量
   timestamp       long     时间戳
   timestampType   int      类型
   
   注意：下面的参数是不能被设置的，否则kafka会抛出异常：
       group.id:kafka的source会在每次query的时候自定创建唯一的group id
	   auto.offset.reset :为了避免每次手动设置startingoffsets的值，structured streaming在内部消费
时会自动管理offset。这样就能保证订阅动态的topic时不会丢失数据。startingOffsets在流处理
时，只会作用于第一次启动时，之后的处理都会自动的读取保存的offset。
       key.deserializer，value.deserializer，key.serializer，value.serializer 序列化与反序列化，都是
ByteArraySerializer
       enable.auto.commit:Kafka源不支持提交任何偏移量
   2).准备工作
   启动kafka
   向topic中生产数据
   代码实现
package com.lg.test

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * 使用结构化流读取kafka中的数据
 */
object StructuredKafka {
  def main(args: Array[String]): Unit = {
    //1.获取sparksession
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(StructuredKafka.getClass.getName)
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    //2.定义读取kafka数据源
    val kafkaDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "linux123:9092")
      .option("subscribe", "spark_kafka")
      .load()

    //3.处理数据
    val kafkaValDf: DataFrame = kafkaDF.selectExpr("CAST(value AS STRING)")
    //转为ds
    val kafkaDs: Dataset[String] = kafkaValDf.as[String]
    val kafkaWordDs: Dataset[String] = kafkaDs.flatMap(_.split(""))
    //执行聚合
    val res: Dataset[Row] = kafkaWordDs.groupBy("value").count().sort($"count".desc)
    //4.输出
    res.writeStream
      .format("console")
      .outputMode("complete")
      .start()
      .awaitTermination()
  }
}
